package com.danan.data_loader.util;

import com.ververica.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/05/29/16:40
 * @Description:
 */
public class SourceUtil {

    public static DataStreamSource<String> getDataStreamSource(StreamExecutionEnvironment env,String schema){
        KafkaSource<String> kafkaSource = null;
        switch (ConfigUtil.getProperty("kafka.consumer.model")){
            case "earliest":
                kafkaSource = KafkaSource.<String>builder()
                        .setBootstrapServers(ConfigUtil.getProperty("kafka.bootstrap.server"))
                        .setTopics(String.format("ods_%s_%s",ConfigUtil.getProperty("data.source.database"),schema.toLowerCase()))
                        .setGroupId(schema)
                        .setStartingOffsets(OffsetsInitializer.earliest())
                        .setValueOnlyDeserializer(new SimpleStringSchema())
                        .build();
                break;
            case "latest":
                kafkaSource = KafkaSource.<String>builder()
                        .setBootstrapServers(ConfigUtil.getProperty("kafka.bootstrap.server"))
                        .setTopics(String.format("ods_%s_%s",ConfigUtil.getProperty("data.source.database"),schema.toLowerCase()))
                        .setGroupId(schema)
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setValueOnlyDeserializer(new SimpleStringSchema())
                        .build();
                break;
            default:
                throw new RuntimeException("The consumption model is not defined. Please select a consumption model from the list [ earliest , latest ]");
        }
        return env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"kafka-source");
    }

}
